python3 自定义logging.Handler, Formatter, Filter模块

您所在的位置:网站首页 python filehandler python3 自定义logging.Handler, Formatter, Filter模块

python3 自定义logging.Handler, Formatter, Filter模块

2023-07-20 07:11| 来源: 网络整理| 查看: 265

在日常使用logging模块中,我们常会使用到官方提供的FileHandler,StreamHandler,RotatingFileHander等,详细参考官方文档, 这些模块都是继承来自与logging.Handler这个父类,而Handler主要用来自定义日志对象的规则(比如:将日志输出到什么地方,哪些日志进行输出、以及日志输出的格式等)。虽然官方提供了很多实现好的Handler,但总有一些特殊情况需要自定义输出日志。比如下面的示例中要将日志输出到kafka集群中,此时我们需要自定义Handler对象。 由于本示例会涉及继承并自定义Handler, Formatter, Filter三个类,所以先描述下本示例的需求。 1)通过Filter类对日志信息进行过滤 2)通过Formatter类对日志信息进行格式化,这里以输出json格式为例 3)通过Handler类将日志输出到kafka

0 常见format参数说明 %(levelno)s: 打印日志级别的数值 %(levelname)s: 打印日志级别名称 %(pathname)s: 打印当前执行程序的路径,其实就是sys.argv[0] %(filename)s: 打印当前执行程序名 %(funcName)s: 打印日志的当前函数 %(lineno)d: 打印日志的当前行号 %(asctime)s: 打印日志的时间 %(thread)d: 打印线程ID %(threadName)s: 打印线程名称 %(process)d: 打印进程ID %(message)s: 打印日志信息 1 实现自定义Handler

首先贴出Handler官方文档,下面继承logging.Handler方法进行自定义输出,主要是去实现emit方法。由于需要连接kafka,所以需要安装kafka-python包。

import json from kafka import KafkaProducer from kafka.errors import KafkaError import logging import datetime class KafkaLoggingHandler(logging.Handler): """ 自定义logging.Handler模块,自定义将日志输出到指定位置(这里是输出到kafka) """ def __init__(self, config=None, topic=None, name=""): super(KafkaLoggingHandler, self).__init__() if isinstance(config, dict) is False: raise ValueError("lack of kafka config parameters...") if isinstance(topic, str) is False: raise ValueError("lack of kafka topic parameters...") self.name = name self.config = config self.producer = KafkaProducer(**self.config) self.topic = topic # 实例化自定义的日志过滤器 filter = KafkaLogFilter() self.addFilter(filter) # 实例化自定义的日志格式化对象 json_format = JsonForMatter() self.setFormatter(json_format) @staticmethod def on_send_success(record_metadata): # 如果消息成功写入Kafka,broker将返回RecordMetadata对象(包含topic,partition和offset print("Success: [{}] send success".format(record_metadata)) @staticmethod def on_send_error(excp): # 如果失败broker将返回error。这时producer收到error会尝试重试发送消息几次,直到producer返回error print("INFO " + "send info failed, cause: {}".format(excp)) def emit(self, record): """ 重写logging.Handler的emit方法 :param record: 传入的日志信息 :return: """ # 对日志信息进行格式化 value = self.format(record) # 转成json格式,注意ensure_ascii参数设置为False,否则中文乱码 value = json.dumps(value, ensure_ascii=False).encode("utf-8") future = self.producer.send(topic=self.topic, value=value) try: record_metadata = future.get(timeout=10) self.on_send_success(record_metadata) except KafkaError as e: self.on_send_error(e)

首先在类的初始化函数中,传入了两个变量,config和topic(name暂不用管),其中config是一个字典类,主要传入连接kafka的集群ip以及端口号,格式如下:

config = { "bootstrap_servers": ["192.168.10.2:9092", "192.168.10.2:9092"] }

topic就是写入kafka的topic字段。接着在emit方法中将传入的日志信息写入kafka。其中self.format就是后面我们会自定义的Formatter模块(后面会讲),对我们的日志信息进行格式化(转成字典类型,方便后续json化)。

2 自定义Formatter

自定义Formatter类主要是为了自定义格式化输出,在下面代码中,主要是重写format方法,将传入的record LogRecord变量类型转为dict类型,在LogRecord变量中存储有很多属性值,这里我们就简单使用其中的filename、lineno、module以及msg这四个属性,然后我们自己在加个时间tm属性。

SAVE_ATTR = ["filename", "lineno", "module", "msg"] class JsonForMatter(logging.Formatter): """ 对日志信息进行自定义格式化 """ def format(self, record): """ 重写logging.Formatter的format方法 :param record: 传入的日志信息 :return: """ msg = self.translate(record) self.set_format_time(msg) return msg # translate LogRecord to dict @staticmethod def translate(record): # 只保留SAVE_ATTR列表中的属性 d = {attr_name: record.__dict__[attr_name] for attr_name in record.__dict__ if attr_name in SAVE_ATTR} return d @staticmethod def set_format_time(msg): now = datetime.datetime.utcnow() format_time = now.strftime("%Y-%m-%d %H:%M:%S" + ".%03d" % (now.microsecond / 1000)) msg["tm"] = format_time 3 自定义Filter

通过自定义Filter类,能够实现哪些日志进行输出,哪些日志直接丢弃。主要是重写filter方法,如果返回True则保留该日志,如果返回False则丢弃该日志。

class KafkaLogFilter(logging.Filter): """ 自定义logging过滤器,决定哪些日志(kafka相关)进行输出 只有通过该过滤器的日志才会触发logging.Handler的emit方法 """ def __init__(self, name="werkzeug"): # werkzeug是WSGI的工具包,其日志中包含着访问端口的记录 super(KafkaLogFilter, self).__init__() self.name = name def filter(self, record): """ 重写logging.Filter的filter方法 只将warning等级和error等级的日志进行输出 :param record: 传入的日志信息 :return: 如果返回true则保留该日志,如果为false则丢弃 """ if record.__dict__["levelname"] in ["WARNING", "ERROR"]: return True # elif record.name == self.name: # 当log的信息来自于werkzeug时进行简单的print # if "args" in record.__dict__: # print(record.__dict__["msg"] % (record.__dict__["args"])) # else: # print(record.__dict__["msg"]) # return False else: return False 使用测试 log = logging.getLogger() kafka = KafkaLoggingHandler() log.setLevel(logging.INFO) log.addHandler(kafka) log.info("test1") # 会被屏蔽 log.warning("test2") # 写入kafka log.error("test3") # 写入kafka


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3